Page 1 of 1

Processing requests in asynchronous mode

Posted: 06.05.2011, 09:59
by Pyh
Since not found an example using queries asynchronously, the only solution was to import the missing functions PQgetCancel, PQfreeCancel, PQcancel and use them in a stream.
Patch was written for version 6.6.4, but 6.6.6 and 7.0 do not differ much.

Code: Select all

ZPlainPostgreSql7.pas, ZPlainPostgreSql8.pas:

  after
    TPQconsumeInput
  add
    TPQgetCancel     = function(Handle: PPGconn): PGcancel; cdecl;
    TPQfreeCancel    = procedure(Canc: PGcancel); cdecl;
    TPQcancel        = function(Canc: PGcancel; Buffer: PChar; BufSize: Integer): Integer; cdecl;

  after
    PQconsumeInput:  TPQconsumeInput;
  add
    PQgetCancel:     TPQgetCancel;
    PQfreeCancel:    TPQfreeCancel;
    PQcancel:        TPQcancel;

  after
    @PQconsumeInput := GetAddress('PQconsumeInput');
  add
    PQgetCancel    := GetAddress('PQgetCancel');
    PQfreeCancel   := GetAddress('PQfreeCancel');
    PQcancel       := GetAddress('PQcancel');

    
ZPlainPostgreSqlDriver.pas:

  after
    PZPostgreSQLResult = Pointer;
  add type PZPostgreSQLCancel:
    PZPostgreSQLCancel = Pointer;

  IZPostgreSQLPlainDriver, TZPostgreSQL7PlainDriver, TZPostgreSQL8PlainDriver
  after
      function ConsumeInput
  add functions:
      function GetCancel(Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
      procedure FreeCancel( Canc: PZPostgreSQLCancel);
      function Cancel( Canc: PZPostgreSQLCancel; Buffer: PChar;
        Length: Integer): Integer;

      function TZPostgreSQL7PlainDriver.GetCancel(
        Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
      begin
        Result := ZPlainPostgreSql7.PQgetCancel(Handle);
      end;

      procedure TZPostgreSQL7PlainDriver.FreeCancel(Canc: PZPostgreSQLCancel);
      begin
        ZPlainPostgreSql7.PQfreeCancel( Canc);
      end;

      function TZPostgreSQL7PlainDriver.Cancel(Canc: PZPostgreSQLCancel;
        Buffer: PChar; Length: Integer): Integer;
      begin
        Result := ZPlainPostgreSql7.PQcancel( Canc, Buffer, Length);
      end;

              
      function TZPostgreSQL8PlainDriver.GetCancel(
        Handle: PZPostgreSQLConnect): PZPostgreSQLCancel;
      begin
        Result := ZPlainPostgreSql8.PQgetCancel(Handle);
      end;

      procedure TZPostgreSQL8PlainDriver.FreeCancel(Canc: PZPostgreSQLCancel);
      begin
        ZPlainPostgreSql8.PQfreeCancel( Canc);
      end;

      function TZPostgreSQL8PlainDriver.Cancel(Canc: PZPostgreSQLCancel;
        Buffer: PChar; Length: Integer): Integer;
      begin
        Result := ZPlainPostgreSql8.PQcancel( Canc, Buffer, Length);
      end;

Example of use:

Code: Select all


unit SQLWaitThread;

interface

uses Classes, ZConnection, ZPlainPostgreSqlDriver, ZDbcPostgreSql;

type

  TSQLWaitThread = class( TThread)
  private
    FZConnection: TZConnection;
    FDriver: IZPostgreSQLPlainDriver;
    FHandle: PZPostgreSQLConnect;
    FSQL: string;
    FStateStr: string;
    procedure SetStateStr( const aStateStr: string);
  protected
    procedure Execute; override;
    procedure PGException( const aText: string);
  public
    constructor Create; override;
    destructor Destroy; override;

    procedure SetConnParams( aProtocol: string; aHostName: string; aPort: integer;
      aDatabase: string; aUser: string; aPassword: string);
    property SQL: string read FSQL write FSQL;
    property StateStr: string read FStateStr write SetStateStr;
  end;

implementation

uses SysUtils;

procedure SQLWaitThread_NoticeProcessor( aArg: Pointer; aMessage: PChar); cdecl;
var
  lpSQLWaitThread: TSQLWaitThread;
begin
  lpSQLWaitThread := aArg;
  lpSQLWaitThread.StateStr := aMessage;
end;

constructor TSQLWaitThread.Create;
begin
  inherited Create;

  FZConnection := TZConnection.Create( nil);
end;

destructor TSQLWaitThread.Destroy;
begin
  FreeAndNil( FZConnection);

  inherited Destroy;
end;

procedure TSQLWaitThread.SetConnParams(aProtocol, aHostName: string;
  aPort: integer; aDatabase, aUser, aPassword: string);
begin
  with FZConnection do begin
    Protocol := aProtocol;
    HostName := aHostName;
    Port := aPort;
    Database := aDatabase;
    User := aUser;
    Password := aPassword;
    Connect;

    with DbcConnection as IZPostgreSQLConnection do begin
      FDriver := GetPlainDriver;
      FHandle := GetConnectionHandle;
      FDriver.SetNoticeProcessor( FHandle, SQLWaitThread_NoticeProcessor, Self);
    end;

  end;

end;

procedure TSQLWaitThread.SetStateStr( const aStateStr: string);
begin
  if FStateStr = aStateStr
  then Exit;

  FStateStr := aStateStr;

//  DoEvent( WTN_STATE_TEXT_CHANGED, MessageData_CreateString( FStateStr));
end;

procedure TSQLWaitThread.PGException( const aText: string);
begin
  raise Exception.CreateFmt( 'Postgre error in "%s":%s%s',
    [ aText, #13#10, FDriver.GetErrorMessage( FHandle)]);
end;

procedure TSQLWaitThread.Execute;
var
  lpCancel: PZPostgreSQLCancel;
  lpBuff: string;
  lpLen: integer;
  lpResult: PZPostgreSQLResult;
  lpRowCount: integer;
  lpRowIndex: integer;
  lpFieldCount: integer;
  lpFieldIndex: integer;
begin
  inherited Exec;

  // send query
  if FDriver.SendQuery( FHandle, PChar( SQL)) <> 1
  then PGException( 'SendQuery');

  while not Terminated do begin

    if FDriver.ConsumeInput( FHandle) <> 1
    then PGException( 'ConsumeInput');

    if FDriver.IsBusy( FHandle) <> 1 then begin
      Break; 
    end;

    Sleep( 10);

  end;

  if Terminated then begin // user press "Cancel" buttton
    lpCancel := FDriver.GetCancel( FHandle); // GetMem for lpCancel record
    try
      lpLen := 256;
      SetLength( lpBuff, lpLen);
      if FDriver.Cancel( lpCancel, PChar( lpBuff), lpLen) <> 1
      then PGException( 'Cancel');
      SetLength( lpBuff, lpLen); // ??
    finally
      FDriver.FreeCancel( lpCancel);  // FreeMem for lpCancel
    end;
  end;

  FQueryResult := '';
  while True // results may be more than one ( if SQL='select 1; select 2')
    lpResult := FDriver.GetResult( FHandle);
    if lpResult = nil
    then Break;

    if FQueryResult <> ''
    then FQueryResult := FQueryResult + #13#10;

    lpFieldCount := FDriver.GetFieldCount( lpResult);
    lpRowCount := FDriver.GetRowCount( lpResult);

    for lpRowIndex := 0 to lpRowCount - 1 do begin
      FQueryResult := FDriver.GetValue( lpResult, lpRowIndex, 0);
      for lpFieldIndex := 1 to lpFieldCount - 1 do begin
        FQueryResult := FQueryResult + #9 + FDriver.GetValue( lpResult, lpRowIndex, lpFieldIndex);
      end;
      FQueryResult := FQueryResult + #13#10;
    end;

    FDriver.Clear( lpResult);
  end; // while

end;


I will be grateful any amendments and comments!

Posted: 19.05.2011, 20:25
by mdaems
Pyh,

Are these API functions available on all Postgres versions >= 7?
In that case I wouldn't mind adding the first part of the code to zeoslib.

Mark

Posted: 20.05.2011, 20:06
by trupka
This useful functions were added in 8.0.

Posted: 30.08.2011, 15:34
by mdaems
Added to SVN testing branch. (Rev 925)
Because of the differences between the 6.X and 7.X plaindrivers this can't be merged back automatically to the stable branch. So I hope I'll not forget to merge it manually using this post later on.

Mark

Posted: 20.12.2011, 09:11
by Pyh
The main advantage of this solution - the ability to cancel your request on the client side without having administrator rights.
I have the need to viewing the data returned

Code: Select all

FDriver.GetResult (FHandle): PZPostgreSQLResult;
Please tell me what the easiest way to convert PZPostgreSQLResult in TDataset?
(Of course I will convert it into the mainstream)

Posted: 15.03.2012, 00:44
by mdaems
Pyh,
If you only have one resultset, can't you use a TZQuery?
Connect using a TZConnection.
Then set the TZconnection.dbcconnection noticeprocessor.
Then open the ZQuery and read the data.

Wouldn't that work?

Mark